package com.atguigu.flink.window;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import com.atguigu.flink.utils.MyUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/13
 *
 类型：  时间，计数
 特征：  滚动，滑动，会话
 计算并行度： 全局，keyed

 演示： keyed时间窗口
            滚动
            滑动
            会话

 */
public class Demo4_KeyedTimeWindow
{
    public static void main(String[] args) {
        
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 3333);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        env.setParallelism(2);

         env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction())
            .keyBy(WaterSensor::getId)
            //keyed时间窗口
            .window(
               TumblingProcessingTimeWindows.of(Time.seconds(5l))
            )
            .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>()
            {
                @Override
                public void process(String key, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                    TimeWindow window = context.window();
                    out.collect(key +": "+MyUtil.parseTimeWindow(window)+":"+MyUtil.parseToList(elements));

                }
            }).setParallelism(10)
            .print();

        
                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }
}
